/*
 * Copyright 2022-2027 中国信息通信研究院云计算与大数据研究所
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */ 
package com.service.impl;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;

import com.common.constant.ClusterConstant;
import com.common.constant.TrancfgConstant;
import com.common.context.SpringContextUtils;
import com.common.util.CommUtil;
import com.localMapper.ClustercfgMapper;
import com.mapper.ReadDataConfig;
import com.mapper.ReadTranConfig;
import com.mapper.RuninfoMapper;
import com.mapper.Salary;
import com.pojo.Clustercfg;
import com.pojo.DataConfig;
import com.pojo.Runinfo;
import com.pojo.SalaryBranchCount;
import com.pojo.Trancfg;
import com.service.SlaveNodeService;

@Service
@Scope("prototype")
public class SlaveNodeServiceImpl implements SlaveNodeService,Runnable {
	
    private final Logger logger = LoggerFactory.getLogger(SlaveNodeServiceImpl.class);
    
    private StringBuffer buf = new StringBuffer();
    @Autowired
    private ReadTranConfig readTranConfig;
    @Autowired
    private ReadDataConfig readDataCfg;
    @Autowired
    private RuninfoMapper runinfoMapper;
	@Autowired
	private ClustercfgMapper clustercfgMapper;
    @Autowired
    private Salary salary;

    private String datacfg_id = "";
    private String trancfg_testid = "";
    private String process_id = "";
    private String cluster_id = "";
    
	@Override
	public void run() {
		Clustercfg clustercfg = new Clustercfg();
		clustercfg.setCluster_id(cluster_id);
		clustercfg.setCluster_finish(ClusterConstant.SLAVE_RUN);
		clustercfgMapper.updateClustercfg(clustercfg);
		try {
			long start = System.currentTimeMillis();
	        DataConfig df = readDataCfg.readDataCfg(datacfg_id);
	        if(df == null) {
	    		clustercfg.setCluster_finish(ClusterConstant.SLAVE_STOP);
	    		clustercfgMapper.updateClustercfg(clustercfg);
	        	return;
	        }
			printDataConfigure(df);//拼装数据配置输出
			Map<String,Integer> threadConfigure = new HashMap<String,Integer>();
			Map<String,Trancfg> tfConfigure = new HashMap<String,Trancfg>();
		    List<SalaryBranchCount> branchCountList = salary.countBranchSalary();
			printRunConfigure(threadConfigure,branchCountList,tfConfigure);//拼装运行配置输出
	        //开始初始化线程配置
	        int threadNum = threadConfigure.get("transferProcNum")+threadConfigure.get("salaryProcNum")
	        				+threadConfigure.get("queryProcNum")+threadConfigure.get("depositProcNum")+
	        				threadConfigure.get("withdrawProcNum");
	        ExecutorService executor = Executors.newFixedThreadPool(threadNum);
	        //开始启动线程
	        beginRunThread(executor,threadConfigure,df,branchCountList,tfConfigure);
	        //关闭线程池
	        executor.shutdown();
	        try {
	            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
	        } catch (InterruptedException e) {
	            logger.error(" SlaveNodeServiceImpl run thread InterruptedException {} ", e);
	        }
	        //保存runinfo
	        insertRunInfo(start);
		} catch (Exception e) {
            logger.error(" SlaveNodeServiceImpl run is error {} ", e);
		}
		clustercfg.setCluster_finish(ClusterConstant.SLAVE_STOP);
		clustercfgMapper.updateClustercfg(clustercfg);
	}

	/**
	 * 保存业务运行结果
	 * @param now
	 */
	private void insertRunInfo(long start) {
		long end = System.currentTimeMillis();
		Runinfo rf = new Runinfo(CommUtil.getRandomUUID().substring(16),process_id,datacfg_id,String.valueOf(start),
				String.valueOf(end),String.valueOf(end-start),buf.toString());	
		runinfoMapper.insertRuninfo(rf);
	}


	private void beginRunThread( ExecutorService executor, Map<String, Integer> threadConfigure, DataConfig df, List<SalaryBranchCount> branchCountList, Map<String, Trancfg> tfConfigure) {
		//转账线程启动
		for (int i = 0; i < threadConfigure.get("transferProcNum"); i++) {
			if(threadConfigure.get("transferRunNum")>0) {
	        	TransferServiceImpl transferService = SpringContextUtils.getBean(TransferServiceImpl.class);
	        	transferService.setTf(tfConfigure.get("transferTf"));
	        	transferService.setDatacfg_accnum(df.getDatacfg_accnum());
	        	transferService.setProcess_id(process_id);
	        	transferService.setNum(threadConfigure.get("transferRunNum")/threadConfigure.get("transferProcNum"));
	        	transferService.setTrancfgType(TrancfgConstant.TRANCFG_ACCOUNTS_TRANSFER);
	        	executor.submit(transferService);
			}
		}
		//代发工资线程启动
        for (int i = 0; i < threadConfigure.get("salaryProcNum"); i++) {
        	if(threadConfigure.get("salaryRunNum")>0) {
        		SalaryServiceImpl salaryServiceImpl = SpringContextUtils.getBean(SalaryServiceImpl.class);
              	salaryServiceImpl.setProcess_id(process_id);
              	int count = (int) Math.floor(threadConfigure.get("salaryRunNum")/(double)threadConfigure.get("salaryProcNum"));
              	int mod = threadConfigure.get("salaryRunNum")%threadConfigure.get("salaryProcNum");
              	salaryServiceImpl.setSalaryRepeatNum(threadConfigure.get("salaryRepeatNum"));
              	salaryServiceImpl.setProcess_id(process_id);
              	if(i<mod) {
                	salaryServiceImpl.setBranchCountList(branchCountList.subList(i*(count+1), (count+1)+i*(count+1)));
              	}else {
              		int start = mod*(count+1)+(i-mod)*count;
                	salaryServiceImpl.setBranchCountList(branchCountList.subList(start, start + count));
              	}
              	salaryServiceImpl.setTf(tfConfigure.get("salaryTf"));
            	executor.submit(salaryServiceImpl);
        	}
		}
        //账户查询线程启动
        for (int i = 0; i < threadConfigure.get("queryProcNum"); i++) {
        	if(threadConfigure.get("queryRunNum")>0) {
        		QueryAccountServiceImpl queryAccountServiceImpl = SpringContextUtils.getBean(QueryAccountServiceImpl.class);
            	queryAccountServiceImpl.setDatacfg_accnum(df.getDatacfg_accnum());
            	queryAccountServiceImpl.setNum(threadConfigure.get("queryRunNum")/threadConfigure.get("queryProcNum"));
            	queryAccountServiceImpl.setProcess_id(process_id);
            	queryAccountServiceImpl.setTf(tfConfigure.get("queryTf"));
            	executor.submit(queryAccountServiceImpl);
        	}
		}		
		//存款线程启动
		for (int i = 0; i < threadConfigure.get("depositProcNum"); i++) {
			if(threadConfigure.get("depositRunNum")>0) {
				WithdrawMoneyServiceImpl depositMoneyService = SpringContextUtils.getBean(WithdrawMoneyServiceImpl.class);
				depositMoneyService.setDatacfg_accnum(df.getDatacfg_accnum());
				depositMoneyService.setNum(threadConfigure.get("depositRunNum")/threadConfigure.get("depositProcNum"));
				depositMoneyService.setTrancfgType(TrancfgConstant.TRANCFG_ACCOUNTS_DEPOSIT);
				depositMoneyService.setProcess_id(process_id);
				depositMoneyService.setTf(tfConfigure.get("transferTf"));
	        	executor.submit(depositMoneyService);
			}
		}
		//取款线程启动
		for (int i = 0; i < threadConfigure.get("withdrawProcNum"); i++) {
			if(threadConfigure.get("withdrawRunNum")>0) {
				DepositMoneyServiceImpl withdrawMoneyService = SpringContextUtils.getBean(DepositMoneyServiceImpl.class);
				withdrawMoneyService.setDatacfg_accnum(df.getDatacfg_accnum());
				withdrawMoneyService.setNum(threadConfigure.get("withdrawRunNum")/threadConfigure.get("withdrawProcNum"));
				withdrawMoneyService.setTrancfgType(TrancfgConstant.TRANCFG_ACCOUNTS_WITHDRAW);
				withdrawMoneyService.setProcess_id(process_id);
				withdrawMoneyService.setTf(tfConfigure.get("transferTf"));
	        	executor.submit(withdrawMoneyService);
			}
		}
	}


	/**
	 * 拼裝運行配置
	 * @param threadConfigure
	 * @param branchCountList
	 * @param tfConfigure 
	 */
	private void printRunConfigure(Map<String, Integer> threadConfigure, List<SalaryBranchCount> branchCountList, Map<String, Trancfg> tfConfigure) {
		 	Trancfg param = new Trancfg();
	        param.setTrancfg_testid(trancfg_testid);
	      	List<Trancfg> tfList = readTranConfig.readTranConfig(param);
	      	threadConfigure.put("transferRunNum", 0);
	      	threadConfigure.put("transferProcNum", 0);
	      	threadConfigure.put("salaryRunNum", 0);
	      	threadConfigure.put("salaryProcNum", 0);
	      	threadConfigure.put("queryRunNum", 0);
	      	threadConfigure.put("queryProcNum", 0);
	      	threadConfigure.put("salaryRepeatNum", 0);
			//代发工资线程数初始化
	      	if(tfList!=null&&tfList.size()>0) {
	    		buf.append("运行配置：");
	      		for(Trancfg tf:tfList) {
	      			if(tf.getTrancfg_id().equals(TrancfgConstant.TRANCFG_ACCOUNTS_TRANSFER)) {
	      				buf.append("转账交易：").append(tf.getTrancfg_runnum()).append("笔").append("，");
	      				buf.append("转账线程：").append(tf.getTrancfg_procnum()).append("；");
	      		      	threadConfigure.put("transferRunNum", tf.getTrancfg_runnum());
	      		      	threadConfigure.put("transferProcNum", tf.getTrancfg_procnum());
	      		      	tfConfigure.put("transferTf",tf);
	      			}else if(tf.getTrancfg_id().equals(TrancfgConstant.TRANCFG_ACCOUNTS_SALARY)) {
	      				if(branchCountList.size()!=0) {
	      			      	threadConfigure.put("salaryRunNum", branchCountList.size());
		      			    threadConfigure.put("salaryProcNum",tf.getTrancfg_procnum());
	      				}
	      			    threadConfigure.put("salaryRepeatNum",tf.getTrancfg_runnum());
	      				int salaryTotal = 0;
	      				for(SalaryBranchCount salaryBranchCount : branchCountList) {
	      					salaryTotal+=salaryBranchCount.getNum();
	      				}
	      				buf.append("代发工资：").append(salaryTotal).append("条(").append(branchCountList.size())
	      				.append("个代发网点，重复代发").append(tf.getTrancfg_runnum()).append("次，共").append(branchCountList.size()*tf.getTrancfg_runnum()).append("笔)").append("，");
	      				buf.append("代发线程：").append(threadConfigure.get("salaryProcNum")).append("；");
	      		      	tfConfigure.put("salaryTf",tf);
	      			}else if(tf.getTrancfg_id().equals(TrancfgConstant.TRANCFG_ACCOUNTS_QUERY)) {
	      				buf.append("账户查询：").append(tf.getTrancfg_runnum()).append("笔").append("，");
	      				buf.append("查询线程：").append(tf.getTrancfg_procnum()).append("；");
	      				threadConfigure.put("queryRunNum", tf.getTrancfg_runnum());
	      		      	threadConfigure.put("queryProcNum", tf.getTrancfg_procnum());
	      		      	tfConfigure.put("queryTf",tf);
	      			}else if(tf.getTrancfg_id().equals(TrancfgConstant.TRANCFG_ACCOUNTS_DEPOSIT)) {
	      				buf.append("账户存款：").append(tf.getTrancfg_runnum()).append("笔").append("，");
	      				buf.append("存款线程：").append(tf.getTrancfg_procnum()).append("；");
	       				threadConfigure.put("depositRunNum", tf.getTrancfg_runnum());
	      		      	threadConfigure.put("depositProcNum", tf.getTrancfg_procnum());
	      		      	tfConfigure.put("depositTf",tf);
	      			}else if(tf.getTrancfg_id().equals(TrancfgConstant.TRANCFG_ACCOUNTS_WITHDRAW)) {
	      				buf.append("账户取款：").append(tf.getTrancfg_runnum()).append("笔").append("，");
	      				buf.append("取款线程：").append(tf.getTrancfg_procnum()).append("；");
	       				threadConfigure.put("withdrawRunNum", tf.getTrancfg_runnum());
	      		      	threadConfigure.put("withdrawProcNum", tf.getTrancfg_procnum());
	      		      	tfConfigure.put("withdrawTf",tf);
	      			}else if(tf.getTrancfg_id().equals(TrancfgConstant.TRANCFG_ACCOUNTS_INVENTORY)) {
	      				buf.append("资产盘点：").append(1).append("笔").append("，");
	      				buf.append("盘点线程：").append(1).append("；");
	      			}
	      		}
	      	}		
	}


	/**
	 * 拼装数据格式输出
	 * @param df
	 */
	private void printDataConfigure(DataConfig df) {
		buf.append("数据配置：");
		buf.append("网点数").append(df.getDatacfg_brhnum()).append("，");
		buf.append("科目数").append(df.getDatacfg_sjnonum()).append("，");
		buf.append("客户数").append(df.getDatacfg_cusnum()).append("，");
		buf.append("账户数").append(df.getDatacfg_accnum()).append("，");
				
	}

	public String getDatacfg_id() {
		return datacfg_id;
	}


	public void setDatacfg_id(String datacfg_id) {
		this.datacfg_id = datacfg_id;
	}


	public String getTrancfg_testid() {
		return trancfg_testid;
	}

	public void setTrancfg_testid(String trancfg_testid) {
		this.trancfg_testid = trancfg_testid;
	}

	public String getProcess_id() {
		return process_id;
	}


	public void setProcess_id(String process_id) {
		this.process_id = process_id;
	}

	public String getCluster_id() {
		return cluster_id;
	}

	public void setCluster_id(String cluster_id) {
		this.cluster_id = cluster_id;
	}
	
}